{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "63ac8480-7fb1-4964-aa7f-d3770c96ec43",
   "metadata": {},
   "source": [
    "# Python Cookbook, 3rd Ed.\n",
    "\n",
    "Chapter 14, Application Integration: Combination\n",
    "\n",
    "Markov Generator\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "bf08ab5c-e170-4e85-9d0d-25881d4ecd6d",
   "metadata": {},
   "outputs": [],
   "source": [
    "from collections.abc import Callable\n",
    "from typing import TypeAlias, Any\n",
    "import random\n",
    "from functools import partial"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "ad4e445a-4778-4ada-a61b-229c5a3a84c4",
   "metadata": {},
   "outputs": [],
   "source": [
    "Chain: TypeAlias = list[int]\n",
    "State: TypeAlias = Callable[[Chain, int], tuple[Chain, Any]]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "c954127a-fd2a-4380-aeb0-0b847e3fd0f7",
   "metadata": {},
   "outputs": [],
   "source": [
    "class Succeed(Exception):\n",
    "    pass\n",
    "\n",
    "class Fail(Exception):\n",
    "    pass\n",
    "\n",
    "def start(chain: Chain, roll: int) -> tuple[Chain, State]:\n",
    "    if roll in {7, 11}:\n",
    "        return chain + [roll], succeed\n",
    "    elif roll in {2, 3, 12}:\n",
    "        return chain + [roll], fail\n",
    "    else:\n",
    "        return chain + [roll], partial(grow_until, roll)\n",
    "\n",
    "def grow_until(point: int, chain: Chain, roll: int) -> tuple[Chain, State]:\n",
    "    if roll in {7}:\n",
    "        return chain + [roll], fail\n",
    "    elif roll == point:\n",
    "        return chain + [roll], succeed\n",
    "    else:\n",
    "        return chain + [roll], partial(grow_until, point)\n",
    "\n",
    "def succeed(chain: Chain, roll: int) -> tuple[Chain, State]:\n",
    "    raise Succeed(chain)\n",
    "\n",
    "def fail(chain: Chain, roll: int) -> tuple[Chain, State]:\n",
    "    raise Fail(chain)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "8f0a8b60-cd0e-4227-af1e-f8c75f77b99e",
   "metadata": {},
   "outputs": [],
   "source": [
    "class Dice:\n",
    "    def __init__(self, seed=None) -> None:\n",
    "        self._rng = random.Random(seed)\n",
    "        self.roll()\n",
    "    def roll(self) -> tuple[int, ...]:\n",
    "        self.dice = (\n",
    "            self._rng.randint(1, 6),\n",
    "            self._rng.randint(1, 6))\n",
    "        return self.dice"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "c1970fff-69b2-4209-8c9b-fef9593bb7cd",
   "metadata": {},
   "outputs": [],
   "source": [
    "def generate(dice: Dice) -> tuple[str, Chain]:\n",
    "    state, chain = start, []\n",
    "    try:\n",
    "        while True:\n",
    "            roll = sum(dice.roll())\n",
    "            chain, state = state(chain, roll)\n",
    "    except Succeed:\n",
    "        return (\"Success\", chain)\n",
    "    except Fail:\n",
    "        return (\"Fail\", chain)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "cb14d791-1bc6-400f-b5eb-6832adddf2be",
   "metadata": {},
   "source": [
    "## Test"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "bf9274f2-73ad-4928-90f9-9756c2fa8d47",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "('Fail', [9, 10, 8, 7])\n",
      "('Success', [7])\n",
      "('Fail', [9, 5, 10, 6, 12, 7])\n",
      "('Success', [9, 4, 6, 6, 5, 12, 9])\n",
      "('Success', [7])\n",
      "('Fail', [10, 6, 5, 11, 7])\n",
      "('Success', [4, 4])\n",
      "('Success', [11])\n",
      "('Fail', [8, 9, 7])\n",
      "('Fail', [12])\n"
     ]
    }
   ],
   "source": [
    "d = Dice(1337)\n",
    "\n",
    "for i in range(10):\n",
    "    print(generate(d))"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "ead8652a-b954-45c6-b9c5-3f2a3da126ff",
   "metadata": {},
   "source": [
    "## Output"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "id": "c20ad411-9d36-4f7c-a420-1f0eba6c80ed",
   "metadata": {},
   "outputs": [],
   "source": [
    "from argparse import Namespace\n",
    "from contextlib import redirect_stdout\n",
    "import csv\n",
    "from pathlib import Path\n",
    "import sys\n",
    "from typing import TextIO"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "id": "09bcf60c-f388-43f6-9eeb-5ec6e357db52",
   "metadata": {},
   "outputs": [],
   "source": [
    "class Writer:\n",
    "    def __init__(self, target: TextIO = sys.stdout) -> None:\n",
    "        self.target = target\n",
    "    def header(self, opts: Namespace, columns: bool=True) -> None:\n",
    "        ...\n",
    "    def sample(self, outcome: str, chain: list[int]) -> None:\n",
    "        ..."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "id": "0ceba0e0-5375-4986-b12c-27f0a782febb",
   "metadata": {},
   "outputs": [],
   "source": [
    "class CSVWriter(Writer):\n",
    "    def __init__(self, target: TextIO) -> None:\n",
    "        super().__init__(target)\n",
    "    def header(self, opts: Namespace, columns: bool = True) -> None:\n",
    "        with redirect_stdout(self.target):\n",
    "            print(f'# file = \"{opts.file_name}\"')\n",
    "            print(f'# samples = {opts.sample_count}')\n",
    "            print(f'# randomize = {opts.randomize}')\n",
    "        if columns:\n",
    "            with redirect_stdout(self.target):\n",
    "                print('# -----')\n",
    "            self.target.flush()\n",
    "            self.writer = csv.writer(self.target)\n",
    "            self.writer.writerow(['outcome', 'length','chain'])\n",
    "    def sample(self, outcome: str, chain: list[int]) -> None:\n",
    "        self.writer.writerow([outcome, len(chain), ';'.join(map(str, chain))])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "id": "17f3b4e3-cf89-4069-b63d-3e781ebfd79f",
   "metadata": {},
   "outputs": [],
   "source": [
    "class TOMLWriter(Writer):\n",
    "    def header(self, opts: Namespace, columns: bool = True) -> None:\n",
    "        with redirect_stdout(self.target):\n",
    "            print(\"[Configuration]\")\n",
    "            print(f'  file = \"{opts.file_name}\"')\n",
    "            print(f'  samples = {opts.sample_count}')\n",
    "            print(f'  randomize = {opts.randomize}')\n",
    "    def sample(self, outcome: str, chain: list[int]) -> None:\n",
    "        with redirect_stdout(self.target):\n",
    "            print(\"[[Samples]]\")\n",
    "            print(f'  outcome = \"{outcome}\"')\n",
    "            print(f'  length = {len(chain)}')\n",
    "            print(f'  chain = {chain}')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "id": "104cf5da-0a4e-4c98-99ac-bfbfea4b8623",
   "metadata": {},
   "outputs": [],
   "source": [
    "opts = Namespace(\n",
    "    file_name = \"sample.csv\",\n",
    "    sample_count = 100,\n",
    "    randomize = 42\n",
    ")\n",
    "\n",
    "dice = Dice(opts.randomize)\n",
    "target = Path(\"../../data\") / \"ch14\" / opts.file_name\n",
    "\n",
    "with target.open('w') as target_file:\n",
    "    writer = CSVWriter(target_file)\n",
    "    writer.header(opts, columns=True)\n",
    "    for i in range(opts.sample_count):\n",
    "        outcome, chain = generate(dice)\n",
    "        writer.sample(outcome, chain)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "f419f2a6-dd9f-47f6-974c-d7d857c12385",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.12.0"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
